#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
#include "jc_pty.h"
#include "config_parser.h"
#if !defined(_WIN32)
#include <unistd.h>
#else
#include <windows.h>
#endif

#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif
#include "mqtt_config.h"

int disc_finished = 0;
int subscribed = 0;
int mqtt_connection_finished = 0;
bool auto_sub_after_connecting = true;

MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
string mqtt_server;
string client_id;
string target;
string mqtt_username;
string mqtt_password;
int mqtt_port = 1833;
int mqtt_keep_alive = 120;
string pub_topic;
string sub_topic;
string sub_topic2;
string sub_topic3;
MQTTAsync client;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
on_message_callback_t _on_message_callback = NULL;
on_error_callback_t _on_error_callback = NULL;

void onConnect(void *context, MQTTAsync_successData *response);
void onConnectFailure(void *context, MQTTAsync_failureData *response);
void onSubscribe(void *context, MQTTAsync_successData *response);
void onSubscribeFailure(void *context, MQTTAsync_failureData *response);
void onDisconnect(void *context, MQTTAsync_successData *response);
void onDisconnectFailure(void *context, MQTTAsync_failureData *response);
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message);
void connlost(void *context, char *cause);

void connlost(void *context, char *cause)
{
	MQTTAsync client = (MQTTAsync)context;
	int rc;

	printf("\nConnection lost\n");
	if (cause)
		printf("     cause: %s\n", cause);

	if (MQTTAsync_connect(client, &conn_opts) != MQTTASYNC_SUCCESS)
	{
		if (_on_error_callback)
		{
			_on_error_callback(MQTT_Connection_Lost);
		}
		printf("Failed to start connect, return code %d\n", rc);
		mqtt_connection_finished = 1;
	}
}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
	if (_on_message_callback)
	{
		_on_message_callback(topicName, topicLen, (char *)message->payload, message->payloadlen);
	}
	MQTTAsync_freeMessage(&message);
	MQTTAsync_free(topicName);
	return 1;
}

void onDisconnectFailure(void *context, MQTTAsync_failureData *response)
{
	disc_finished = 1;
}

void onSubscribeFailure(void *context, MQTTAsync_failureData *response)
{
	printf("Subscribe failed, rc %d\n", response->code);
}

void assert_not_empty(string &val)
{
	if (val.empty())
	{
		throw std::runtime_error("config value is empty" + to_string(__LINE__));
	}
}

int read_config(string &f_name)
{
	auto config = config_parse(f_name);
	mqtt_server = config_get(config, "mqtt", "server", "aahqtdc.iot.gz.baidubce.com");
	client_id = config_get(config, "mqtt", "client_id");
	assert_not_empty(client_id);
	target = config_get(config, "mqtt", "target");
	assert_not_empty(target);
	mqtt_username = "thingidp@aahqtdc|" + client_id;
	mqtt_password = config_get(config, "mqtt", "password");
	assert_not_empty(mqtt_password);
	string _port = config_get(config, "mqtt", "port", "1883");
	mqtt_port = std::stoi(_port);
	string _keep_alive = config_get(config, "mqtt", "keep_alive", "120");
	mqtt_keep_alive = std::stoi(_keep_alive);
	pub_topic = "lily/" + target + "/cmd";
	sub_topic = "lily/" + target + "/msg";
	sub_topic2 = "lily/" + target + "/pong";
	return 0;
}
int read_config_slaver(string &f_name)
{
	auto config = config_parse(f_name);
	string sector = "jc-pi";
	mqtt_server = config_get(config, sector, "server", "aahqtdc.iot.gz.baidubce.com");
	client_id = config_get(config, sector, "client_id");
	assert_not_empty(client_id);
	target = client_id;
	mqtt_username = "thingidp@aahqtdc|" + client_id;
	mqtt_password = config_get(config, sector, "password");
	assert_not_empty(mqtt_password);
	string _port = config_get(config, sector, "port", "1883");
	mqtt_port = std::stoi(_port);
	string _keep_alive = config_get(config, sector, "keep_alive", "120");
	mqtt_keep_alive = std::stoi(_keep_alive);
	pub_topic = "lily/" + client_id + "/msg";
	sub_topic = "lily/" + client_id + "/cmd";
	sub_topic2 = "lily/" + client_id + "/ping";
	sub_topic3 = "lily/" + client_id + "/ext";
	return 0;
}

void onConnectFailure(void *context, MQTTAsync_failureData *response)
{
	mqtt_connection_finished = 1;
	if (_on_error_callback)
	{
		_on_error_callback(MQTT_Connection_Failure);
	}
}

void onConnect(void *context, MQTTAsync_successData *response)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;

	opts.onSuccess = onSubscribe;
	opts.onFailure = onSubscribeFailure;
	opts.context = client;
	if (_on_error_callback)
	{
		_on_error_callback(MQTT_Connection_OK);
	}
	if (!auto_sub_after_connecting)
	{
		return;
	}
	if (MQTTAsync_subscribe(client, sub_topic.c_str(), 0, &opts) != MQTTASYNC_SUCCESS)
	{
		cout << "subscribe failed:" << sub_topic << endl;
	}
	if (MQTTAsync_subscribe(client, sub_topic2.c_str(), 0, &opts) != MQTTASYNC_SUCCESS)
	{
		cout << "subscribe failed:" << sub_topic2 << endl;
	}
	if (sub_topic3.empty())
		return;
	if (MQTTAsync_subscribe(client, sub_topic3.c_str(), 0, &opts) != MQTTASYNC_SUCCESS)
	{
		cout << "subscribe failed:" << sub_topic3 << endl;
	}
}
int subscribe_topic(string &topic)
{
	if (MQTTAsync_subscribe(client, topic.c_str(), 0, NULL) != MQTTASYNC_SUCCESS)
	{
		return -1;
	}
	return 0;
}

int start_loop()
{
	int rc;
	// tcp://aahqtdc.iot.gz.baidubce.com:1883
	string address = "tcp://" + mqtt_server + ":" + to_string(mqtt_port);
	if ((rc = MQTTAsync_create(&client, address.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to create client, return code %d\n", rc);
		rc = __LINE__;
		goto exit;
	}

	if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to set callbacks, return code %d\n", rc);
		rc = __LINE__;
		goto destroy_exit;
	}

	conn_opts.keepAliveInterval = mqtt_keep_alive;
	conn_opts.cleansession = 1;
	conn_opts.onSuccess = onConnect;
	conn_opts.onFailure = onConnectFailure;
	conn_opts.username = mqtt_username.c_str();
	conn_opts.password = mqtt_password.c_str();
	conn_opts.context = client;
	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start connect, return code %d\n", rc);
		rc = __LINE__;
		goto destroy_exit;
	}
	pubmsg.qos = 0;
	pubmsg.retained = 0;
	return 0;
destroy_exit:
	MQTTAsync_destroy(&client);
exit:
	return rc;
}
void set_on_message_callback(on_message_callback_t cb)
{
	_on_message_callback = cb;
}
void set_on_error_callback(on_error_callback_t cb)
{
	_on_error_callback = cb;
}
int mqtt_out_raw(string &msg)
{
	int rc = 0;
	pubmsg.payload = (char *)(msg.c_str());
	pubmsg.payloadlen = msg.length();

	if ((rc = MQTTAsync_sendMessage(client, pub_topic.c_str(), &pubmsg, NULL)) != MQTTASYNC_SUCCESS)
	{
		return __LINE__;
	}
	return 0;
}
int mqtt_out_raw(const char *msg, int len)
{
	int rc = 0;
	pubmsg.payload = (char *)(msg);
	pubmsg.payloadlen = len;

	if ((rc = MQTTAsync_sendMessage(client, pub_topic.c_str(), &pubmsg, NULL)) != MQTTASYNC_SUCCESS)
	{
		return __LINE__;
	}
	return 0;
}
int client_publish(string &topic, string &msg)
{
	return client_publish(topic.c_str(), msg);
}
int client_publish(const char *topic, string &msg)
{
	int rc = 0;
	pubmsg.payload = (char *)(msg.c_str());
	pubmsg.payloadlen = msg.length();
	string pub_topic = "lily/" + target + "/" + topic;
	if ((rc = MQTTAsync_sendMessage(client, pub_topic.c_str(), &pubmsg, NULL)) != MQTTASYNC_SUCCESS)
	{
		return __LINE__;
	}
	return 0;
}
int client_publish(string &topic, const char *msg, int len)
{
	return client_publish(topic.c_str(), msg, len);
}
int client_publish(const char *topic, const char *msg, int len)
{
	int rc = 0;
	pubmsg.payload = (char *)(msg);
	pubmsg.payloadlen = len;
	string pub_topic = "lily/" + target + "/" + topic;
	if ((rc = MQTTAsync_sendMessage(client, pub_topic.c_str(), &pubmsg, NULL)) != MQTTASYNC_SUCCESS)
	{
		return __LINE__;
	}
	return 0;
}
void onDisconnect(void *context, MQTTAsync_successData *response)
{
	disc_finished = 1;
}

void onSubscribe(void *context, MQTTAsync_successData *response)
{
	subscribed = 1;
}

int stop_loop()
{
	MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
	int rc = 0;
	disc_opts.onSuccess = onDisconnect;
	disc_opts.onFailure = onDisconnectFailure;
	if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
	{
		rc = __LINE__;
		goto exit;
	}
	while (!disc_finished)
	{
#if defined(_WIN32)
		Sleep(100);
#else
		usleep(10000L);
#endif
	}

exit:
	MQTTAsync_destroy(&client);
	return rc;
}

#if TESTSUIT == TESTSUIT_MQTT_CONFIG
void print_on_the_message(char *topic, int topic_len, char *msg, int msg_len)
{
	printf("%.*s", msg_len, msg);
}
int main(int argn, char *argv[])
{
	string config_path = "../mqtt.ini";
	if (argn > 1)
	{
		config_path = argv[1];
	}
	int rc = 0;
	rc = read_config(config_path);
	if (rc)
		exit(rc);
	rc = start_loop();
	if (rc)
		exit(rc);
	set_on_message_callback(print_on_the_message);
	string ping = "ping";
	// usleep(3000000L);
	client_publish(ping, ping);

	while (1)
	{
		string cmd;
		getline(cin, cmd);
		if (cmd == "exit")
			break;
		mqtt_out_raw(cmd);
	}
	stop_loop();
	return 0;
}
#endif